-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20918 Add cursor-based low allocation optimized compaction implementation #4402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD - EPPv1 - all new code - Cursor compaction integration - JMH benchmarks for compaction and cursor impls - EPPv1 - New tests - Existing tests tweaks for new code - [revert?] change the default partitioner to expand testing of new code - [revert?] test data used some benchmarks - [revert?] jmh tweak GC settings for stability - [revert?] javadoc typos, marking unused params - [revert?] clarifying comment - [revert?] toString improvement - [revert?] remove spurious keywords - [revert?] marking metadata collection - [revert?] cursor verifier - Exclude SAI and counter column - Exclude BTI and legacy versions - Temporarily skip very long running test
{ | ||
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); | ||
public static final int MEGABYTE = 1024 * 1024 * 1024; | ||
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move this to Config.java? One advantage of having it there is that the AST tests (like SingleNodeTableWalkTest) that generate random config would be able to exercise it. That's our best path to coverage with lots of different schemas and configurations.
If you can locally do a longer run of that test with cursor-compaction enabled, that would be useful too. That would be done via overriding StatefulASTBase#clusterConfig
with the new config set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.
Let's make sure that among test
, test-oa
and test-latest
we have at least one that is running with cursor compaction and one without.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.
How do I do that?
{ | ||
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); | ||
public static final int MEGABYTE = 1024 * 1024 * 1024; | ||
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.
Let's make sure that among test
, test-oa
and test-latest
we have at least one that is running with cursor compaction and one without.
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It must be stated that this approach that bundles all the steps of the processing in one single file will be quite difficult to maintain and keep in sync with the combination of iterators and transformations that we use in other parts of the code such as the query path. However, once we have reached a point of stability for a piece of functionality where we do not expect it to change significantly for a long time, it does makes sense to unpack the code and present it in a way that makes its execution as direct as possible, and this patch is a good such representation of the compaction process.
Personally, I am very unhappy about switching to mutable, pooled and reused objects, which are significantly more unwieldy and error prone, especially in contexts where concurrent access can occur. It seems this is becoming a necessity if we need to achieve acceptable performance with the current state of our heap usage, but we still need to very carefully separate the mutable versions of concepts from the immutable ones used throughout the code base. Suddenly making a DeletionTime
mutable is not an acceptable change.
First batch of targeted comments below, mainly going over CompactionCursor.java
.
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
// Merge any common normal rows | ||
int elementMergeLimit = partitionMergeLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace "element" with "unfiltered" throughout to conform with the existing nomenclature.
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
Outdated
Show resolved
Hide resolved
* likely at this stage (probably). | ||
* {@link Row.Merger#merge(DeletionTime)} | ||
*/ | ||
private boolean mergeRows(int rowMergeLimit, DeletionTime partitionDeletion, boolean isStatic, boolean isFirstElement) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionDeletion
here is misleading, rename to active
.
elementCount++; | ||
lastClustering = sstableCursors[0].rHeader; | ||
} | ||
if (activeOpenRangeDeletion == DeletionTime.LIVE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would feel safer if we used null
for no deletion. Since we already explicitly check for it every time, it would not be more complex than this.
Otherwise we run a (however small and carefully managed) chance of inadvertently modifying LIVE
, which will have devastating consequences.
continueReadingAfterMerge(elementMergeLimit, ELEMENT_END); | ||
} | ||
|
||
boolean partitionWritten = isPartitionStarted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that it would be simpler to move the decision of when to actually write the partition header to the writer. Are there any reasons to prefer to do it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of comments.
{ | ||
out.writeByte(clustering.kind().ordinal()); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not quite clear if this really is unused. Do you rely on them being unused? If so, shouldn't we remove the versioning support for clusterings altogether (preferably in a separate commit)?
{ | ||
if (clusteringIndex % 32 == 0) | ||
{ | ||
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the manual offset tracking more efficient than advancing c1
and c2
with VIntCoding.readUnsignedVInt
?
ofst2 += vlen2; | ||
} | ||
// present > not present | ||
else if (v1Present && !v2Present) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The else
case can be done as
// null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
// compare swapped arguments to reverse the order
int cmp = Long.compare(clusteringBlock2 & 0b11, clusteringBlock1 & 0b11);
if (cmp != 0)
return cmp;
} | ||
if (!UnfilteredSerializer.hasAllColumns(flags)) | ||
{ | ||
// TODO: re-implement GC free |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataStax's branch has an implementation of it.
{ | ||
long finishResult = partitionWriter.finish(); | ||
// not inclusive of last byte | ||
long partitionLength = partitionWriter.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not at all guaranteed to be the partition length, changing the name here is very misleading.
|
||
@Override | ||
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key) | ||
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to rename the parameter?
} | ||
|
||
public void updateClusteringValues(ClusteringDescriptor newClustering) { | ||
if (newClustering == null || newClustering.clusteringKind().isBoundary()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to copy the comment from updateClusteringValuesByBoundOrBoundary
to explain skipping boundaries.
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); | ||
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); | ||
Slice coveredClustering; | ||
if (minClusteringDescriptor.clusteringKind() != ClusteringPrefix.Kind.EXCL_START_BOUND) // min is end only if the descriptors are unused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The minimum can certainly be EXCL_START_BOUND
when it is used, if a partition starts with a range tombstone. The maximum, on the other hand, can't.
If you want to do this by a single operation (and also remove the minClusteringDescriptor.clusteringColumnsBound() == 0
check in updateClusteringValues
), you can change the uninitialized min kind to SSTABLE_UPPER_BOUND
, because that won't ever be given to updateClusteringValues
.
… elsewhere using seek
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of comments.
return retval; | ||
} | ||
|
||
public static long getUnsignedVInt(byte[] input, int offset, int length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't appear to be used.
import org.apache.cassandra.dht.Murmur3Partitioner; | ||
import org.jctools.util.UnsafeAccess; | ||
|
||
public class ReusableLongToken extends Murmur3Partitioner.LongToken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This shouldn't need to be public.
import org.apache.cassandra.utils.ByteArrayUtil; | ||
import org.apache.cassandra.utils.ByteBufferUtil; | ||
|
||
public class ReusableDecoratedKey extends DecoratedKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is pretty hacky. It shouldn't be hard to move the support for reusable tokens to the partitioner (throwing exceptions for all except Murmur and local).
private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); | ||
|
||
protected final long timestamp; | ||
protected long timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class can be easily converted to interface so that we don't have to make the base mutable.
* Base class for the sstable writers used by CQLSSTableWriter. | ||
*/ | ||
abstract class AbstractSSTableSimpleWriter implements Closeable | ||
public abstract class AbstractSSTableSimpleWriter implements Closeable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be made public.
/** common to rows/tombstones. Call continue(); for next element, or maybe partition end */ | ||
int ELEMENT_END = 1 << 8; | ||
/** at {@link UnfilteredSerializer#isEndOfPartition(int)} */ | ||
int PARTITION_END = 1 << 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what the benefit of these END states is. Why can't we advance to the next entry directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
END state allows the implementation to part the cursors clearly before moving to the next iteration.
public interface State | ||
{ | ||
/** start of file, after partition end but before EOF */ | ||
int PARTITION_START = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that we don't know the key at partition/unfiltered/cell start leads to some odd-looking code in CompactionCursor
. Is there a benefit to delay the reading of the header until it is explicitly requested?
return state; | ||
} | ||
|
||
private int checkNextFlags() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller of this method appears to be pretty well aware what kind of flags/state it expects this to be called in. Would it make sense to split it into checkNext(Partition|Unfiltered|Cell)Flags
?
appendBIGIndex(partitionKey, partitionKeyLength, partitionStart, headerLength, partitionDeletionTime, partitionEnd); | ||
} | ||
|
||
private void appendBIGIndex(byte[] key, int keyLength, long partitionStart, int headerLength, DeletionTime partitionDeletionTime, long partitionEnd) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not easy to modify and reuse the index building code from BigFormatPartitionWriter
? The duplication here seems quite unnecessary.
private final int indexBlockThreshold; | ||
|
||
|
||
private SSTableCursorWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be split into a common SortedTableCursorWriter
, with format-specific subclasses that instantiate the index builders it uses, and placed into the correct per-format packages.
…nMerge` and return true when partitions remain
Javadoc for bubbleInsertToPreSorted
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD
Thanks for sending a pull request! Here are some tips if you're new here:
Commit messages should follow the following format:
The Cassandra Jira